-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union #15567
Conversation
Test build #67258 has finished for PR 15567 at commit
|
Test build #67257 has finished for PR 15567 at commit
|
Test build #67260 has finished for PR 15567 at commit
|
|
||
/** | ||
* [performance] Spark's internal mapPartitions method that skips closure cleaning. | ||
*/ | ||
private[spark] def mapPartitionsInternal[U: ClassTag]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 20+ probably valid use of mapPartitionsInternal
. The main problem is that changing it to mapPartitionsWithIndexInternal
doesn't really force people to initialize the partition.
@@ -274,12 +274,12 @@ trait Nondeterministic extends Expression { | |||
|
|||
private[this] var initialized = false | |||
|
|||
final def setInitialValues(): Unit = { | |||
initInternal() | |||
final def initializeStatesForPartition(partitionIndex: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while you are at it, it'd be great to add some comments documenting the function ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just naming this "initialize"? It is fairly long right now ....
And we just document to say initialize must be called prior to task execution on a partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to overload the name initialize
, which is a little vague, how about initStates
? Again, the issue is that even with comments we cannot force users to initialize it.
*/ | ||
val partitionInitializationStatements: mutable.ArrayBuffer[String] = mutable.ArrayBuffer.empty | ||
|
||
def addPartitionInitializationStatement(statement: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason you are creating this rather than just using addMutableState?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little worried about introducing more issues by moving initMutableStates
out from the constructor. The current implementation at least maintains the existing behavior if we missed initializeStatesForPartition
somewhere.
@@ -274,12 +274,12 @@ trait Nondeterministic extends Expression { | |||
|
|||
private[this] var initialized = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we change this to transient? then it will always get reset to false on a new partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
*/ | ||
@ExpressionDescription( | ||
usage = "_FUNC_() - Returns the current partition id of the Spark task", | ||
usage = "_FUNC_() - Returns the current partition id", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm this is behavior changing, and there is some value to the old partition id ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd consider introducing a new expression for the proper id and leave the old one as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this. But I don't think the current behavior is the expected behavior from users. This is the same issue as in monotonically_increasing_id
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea but it is consistent with TaskContext.partitionId (which is also the name of the function)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is SparkPartitionID
not TaskContextPartitionID
. We should follow the same semantic for non-deterministic expressions.
* This is used by non-deterministic expressions to set initial states. | ||
* The default implementation does nothing. | ||
*/ | ||
def initializeStatesForPartition(partitionIndex: Int): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to make this safer, i'd create an internal variable "isInitialized" similar to the one in nondeterministic expression, and assert in eval if isInitialized is false
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't test. Would doing that hurt the performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, since it is in the interpreted path which is already very slow. Also in the normal case the condition will always be false, so CPU branch prediction should work its magic.
Reviewing this code makes me realize how painful it is when project/filter are just scala functions ... it'd be much easier to review if they have methods defined (e.g. eval, or execute) ... |
So my biggest question is whether you've changed all the places to call initialize where projection/predicate are used. |
Test build #67270 has finished for PR 15567 at commit
|
Test build #67277 has finished for PR 15567 at commit
|
@mengxr - I think this PR will also address SPARK-14241. |
SPARK-14241 doesn't just occur with union and coalesce, it also occurs with filter and probably other operations. Hopefully this PR will address all of those situations. I strongly agree with the expected semantic in the original PR message by mengxr - this has bitten me on multiple occasions. |
@rxin I updated the implementation to force initialization in Projection/Expression. This will fail many tests. I fixed all in
Basically, we assume non-deterministic by default unless marked as deterministic. This will require updating all expressions but make the code less messy. |
* Trait for expressions that requires initialization based on the partition index prior to task | ||
* execution on a partition. | ||
*/ | ||
trait PartitionDependent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline this is StatefulExpression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, Projection
also need the same trait. Shall we call it PartitionedStateful
?
Test build #67907 has finished for PR 15567 at commit
|
I reverted the changes I made to enforce Renamed |
Test build #67957 has finished for PR 15567 at commit
|
Test build #67959 has finished for PR 15567 at commit
|
Merging in master/branch-2.1. Thanks. |
…ouldn't change after coalesce or union ## What changes were proposed in this pull request? When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following: - The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations. However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column. See the unit tests below or JIRA for examples. This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization. ## How was this patch tested? Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...) cc: rxin davies Author: Xiangrui Meng <[email protected]> Closes #15567 from mengxr/SPARK-14393. (cherry picked from commit 02f2031) Signed-off-by: Reynold Xin <[email protected]>
…ouldn't change after coalesce or union ## What changes were proposed in this pull request? When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following: - The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations. However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column. See the unit tests below or JIRA for examples. This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization. ## How was this patch tested? Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...) cc: rxin davies Author: Xiangrui Meng <[email protected]> Closes apache#15567 from mengxr/SPARK-14393.
val predicate = newPredicate(condition, child.output) | ||
predicate.initialize(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering why FilterExec
is not using index
to initialize the conditions?
What changes were proposed in this pull request?
When a user appended a column using a "nondeterministic" function to a DataFrame, e.g.,
rand
,randn
, andmonotonically_increasing_id
, the expected semantic is the following:However, since we use
TaskContext.getPartitionId
to get the partition index from the current thread, the values from nondeterministic columns might change if we callunion
orcoalesce
after.TaskContext.getPartitionId
returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column.See the unit tests below or JIRA for examples.
This PR uses the partition index from
RDD.mapPartitionWithIndex
instead ofTaskContext
and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback.initializeStatesForPartition(partitionIndex: Int)
was added toProjection
,Nondeterministic
, andPredicate
(codegen) and initialized right after object creation inmapPartitionWithIndex
.newPredicate
now returns aPredicate
instance rather than a function for proper initialization.How was this patch tested?
Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...)
cc: @rxin @davies